package jupitermouse.site.sql

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._


/**
  * DataFrame和Rdd互转
  */
object DataFrameRDDApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[4]").config("spark.driver.host", "localhost").getOrCreate()

    printDF(spark,reflection(spark))
    printDF(spark,program(spark))
    spark.stop()
  }

  /**
    * RDD ==> DataFrame
    * @param spark SparkSession
    */
  def reflection(spark: SparkSession): DataFrame ={
    // crate RDD
    val rdd = spark.sparkContext.textFile("file://")

    //import implicits
    import spark.implicits._
    //RDD->DataFrame
    var personDF = rdd.map(_.split(",")).map(line => Person(line(0), line(1).toInt)).toDF()

    return personDF
  }

  def program(spark: SparkSession): DataFrame ={
    val rdd = spark.sparkContext.textFile("")

    // 将RDD映射到rowRDD
    val personRDD = rdd.map(_.split(","))
      .map(line => Row(line(0),line(1).toInt))

    val schema  = StructType(List(
      StructField("name", StringType, true),
      StructField("age", IntegerType)
    ))

    val personDF = spark.createDataFrame(personRDD,schema)

    return personDF
  }

  def printDF(spark:SparkSession, personDF:DataFrame): Unit = {
    //DataFrame Api
    personDF.show()
    personDF.filter(personDF.col("age") > 30 ).show()

    personDF.createOrReplaceTempView("person")

    spark.sql("select * from person where age > 20")
  }

  case class Person(name:String, age:Int)
}
